Explainers
ImageLIME
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
lime = (ImageLIME()
.setModel(model)
.setOutputCol("weights")
.setInputCol("image")
.setCellSize(150.0)
.setModifier(50.0)
.setNumSamples(500)
.setTargetCol("probability")
.setTargetClassesCol("top2pred")
.setSamplingFraction(0.7))
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val lime = (new ImageLIME()
.setModel(model)
.setOutputCol("weights")
.setInputCol("image")
.setCellSize(150.0)
.setModifier(50.0)
.setNumSamples(500)
.setTargetCol("probability")
.setTargetClassesCol("top2pred")
.setSamplingFraction(0.7))
Python API: ImageLIME | Scala API: ImageLIME | Source: ImageLIME |
ImageSHAP
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
shap = (
ImageSHAP()
.setModel(model)
.setOutputCol("shaps")
.setSuperpixelCol("superpixels")
.setInputCol("image")
.setCellSize(150.0)
.setModifier(50.0)
.setNumSamples(500)
.setTargetCol("probability")
.setTargetClassesCol("top2pred")
)
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val shap = (new ImageSHAP()
.setModel(model)
.setOutputCol("shaps")
.setSuperpixelCol("superpixels")
.setInputCol("image")
.setCellSize(150.0)
.setModifier(50.0)
.setNumSamples(500)
.setTargetCol("probability")
.setTargetClassesCol("top2pred")
))
Python API: ImageSHAP | Scala API: ImageSHAP | Source: ImageSHAP |
TabularLIME
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
data = spark.createDataFrame([
(-6.0, 0),
(-5.0, 0),
(5.0, 1),
(6.0, 1)
], ["col1", "label"])
lime = (TabularLIME()
.setModel(model)
.setInputCols(["col1"])
.setOutputCol("weights")
.setBackgroundData(data)
.setKernelWidth(0.001)
.setNumSamples(1000)
.setTargetCol("probability")
.setTargetClasses([0, 1]))
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val data = Seq(
(-6.0, 0),
(-5.0, 0),
(5.0, 1),
(6.0, 1)
).toDF("col1", "label")
val lime = (new TabularLIME()
.setInputCols(Array("col1"))
.setOutputCol("weights")
.setBackgroundData(data)
.setKernelWidth(0.001)
.setNumSamples(1000)
.setModel(model)
.setTargetCol("probability")
.setTargetClasses(Array(0, 1)))
Python API: TabularLIME | Scala API: TabularLIME | Source: TabularLIME |
TabularSHAP
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
data = spark.createDataFrame([
(-5.0, "a", -5.0, 0),
(-5.0, "b", -5.0, 0),
(5.0, "a", 5.0, 1),
(5.0, "b", 5.0, 1)
]*100, ["col1", "label"])
shap = (TabularSHAP()
.setInputCols(["col1", "col2", "col3"])
.setOutputCol("shapValues")
.setBackgroundData(data)
.setNumSamples(1000)
.setModel(model)
.setTargetCol("probability")
.setTargetClasses([1]))
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val data = (1 to 100).flatMap(_ => Seq(
(-5d, "a", -5d, 0),
(-5d, "b", -5d, 0),
(5d, "a", 5d, 1),
(5d, "b", 5d, 1)
)).toDF("col1", "col2", "col3", "label")
val shap = (new TabularSHAP()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("shapValues")
.setBackgroundData(data)
.setNumSamples(1000)
.setModel(model)
.setTargetCol("probability")
.setTargetClasses(Array(1)))
Python API: TabularSHAP | Scala API: TabularSHAP | Source: TabularSHAP |
TextLIME
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
lime = (TextLIME()
.setModel(model)
.setInputCol("text")
.setTargetCol("prob")
.setTargetClasses([1])
.setOutputCol("weights")
.setTokensCol("tokens")
.setSamplingFraction(0.7)
.setNumSamples(1000))
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val lime = (new TextLIME()
.setModel(model)
.setInputCol("text")
.setTargetCol("prob")
.setTargetClasses(Array(1))
.setOutputCol("weights")
.setTokensCol("tokens")
.setSamplingFraction(0.7)
.setNumSamples(1000))
Python API: TextLIME | Scala API: TextLIME | Source: TextLIME |
TextSHAP
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
shap = (TextSHAP()
.setModel(model)
.setInputCol("text")
.setTargetCol("prob")
.setTargetClasses([1])
.setOutputCol("weights")
.setTokensCol("tokens")
.setNumSamples(1000))
import com.microsoft.azure.synapse.ml.explainers._
import com.microsoft.azure.synapse.ml.onnx._
import spark.implicits._
val model = (new ONNXModel())
val shap = (new TextSHAP()
.setModel(model)
.setInputCol("text")
.setTargetCol("prob")
.setTargetClasses(Array(1))
.setOutputCol("weights")
.setTokensCol("tokens")
.setNumSamples(1000))
Python API: TextSHAP | Scala API: TextSHAP | Source: TextSHAP |
VectorLIME
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
df = spark.createDataFrame([
([0.2729799734928408, -0.4637273304253777, 1.565593782147994], 4.541185129673482),
([1.9511879801376864, 1.495644437589599, -0.4667847796501322], 0.19526424470709836)
])
lime = (VectorLIME()
.setModel(model)
.setBackgroundData(df)
.setInputCol("features")
.setTargetCol("label")
.setOutputCol("weights")
.setNumSamples(1000))
import com.microsoft.azure.synapse.ml.explainers._
import spark.implicits._
import breeze.linalg.{*, DenseMatrix => BDM}
import breeze.stats.distributions.Rand
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
val d1 = 3
val d2 = 1
val coefficients: BDM[Double] = new BDM(d1, d2, Array(1.0, -1.0, 2.0))
val df = {
val nRows = 100
val intercept: Double = math.random()
val x: BDM[Double] = BDM.rand(nRows, d1, Rand.gaussian)
val y = x * coefficients + intercept
val xRows = x(*, ::).iterator.toSeq.map(dv => Vectors.dense(dv.toArray))
val yRows = y(*, ::).iterator.toSeq.map(dv => dv(0))
xRows.zip(yRows).toDF("features", "label")
}
val model: LinearRegressionModel = new LinearRegression().fit(df)
val lime = (new VectorLIME()
.setModel(model)
.setBackgroundData(df)
.setInputCol("features")
.setTargetCol(model.getPredictionCol)
.setOutputCol("weights")
.setNumSamples(1000))
Python API: VectorLIME | Scala API: VectorLIME | Source: VectorLIME |
VectorSHAP
- Python
- Scala
from synapse.ml.explainers import *
from synapse.ml.onnx import ONNXModel
model = ONNXModel()
shap = (VectorSHAP()
.setInputCol("features")
.setOutputCol("shapValues")
.setNumSamples(1000)
.setModel(model)
.setTargetCol("probability")
.setTargetClasses([1]))
import com.microsoft.azure.synapse.ml.explainers._
import spark.implicits._
import breeze.linalg.{*, DenseMatrix => BDM}
import breeze.stats.distributions.RandBasis
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors
val randBasis = RandBasis.withSeed(123)
val m: BDM[Double] = BDM.rand[Double](1000, 5, randBasis.gaussian)
val l: BDV[Double] = m(*, ::).map {
row =>
if (row(2) + row(3) > 0.5) 1d else 0d
}
val data = m(*, ::).iterator.zip(l.valuesIterator).map {
case (f, l) => (f.toSpark, l)
}.toSeq.toDF("features", "label")
val model = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.fit(data)
val shap = (new VectorSHAP()
.setInputCol("features")
.setOutputCol("shapValues")
.setBackgroundData(data)
.setNumSamples(1000)
.setModel(model)
.setTargetCol("probability")
.setTargetClasses(Array(1))
val infer = Seq(
Tuple1(Vectors.dense(1d, 1d, 1d, 1d, 1d))
) toDF "features"
val predicted = model.transform(infer)
shap.transform(predicted).show()
Python API: VectorSHAP | Scala API: VectorSHAP | Source: VectorSHAP |
Featurize
DataConversion
- Python
- Scala
from synapse.ml.featurize import *
df = spark.createDataFrame([
(True, 1, 2, 3, 4, 5.0, 6.0, "7", "8.0"),
(False, 9, 10, 11, 12, 14.5, 15.5, "16", "17.456"),
(True, -127, 345, 666, 1234, 18.91, 20.21, "100", "200.12345")
], ["bool", "byte", "short", "int", "long", "float", "double", "intstring", "doublestring"])
dc = (DataConversion()
.setCols(["byte"])
.setConvertTo("boolean"))
dc.transform(df).show()
import com.microsoft.azure.synapse.ml.featurize._
import spark.implicits._
val df = Seq(
(true: Boolean, 1: Byte, 2: Short, 3: Integer, 4: Long, 5.0F, 6.0, "7", "8.0"),
(false, 9: Byte, 10: Short, 11: Integer, 12: Long, 14.5F, 15.5, "16", "17.456"),
(true, -127: Byte, 345: Short, Short.MaxValue + 100, (Int.MaxValue).toLong + 100, 18.91F, 20.21, "100", "200.12345"))
.toDF("bool", "byte", "short", "int", "long", "float", "double", "intstring", "doublestring")
val dc = (new DataConversion()
.setCols(Array("byte"))
.setConvertTo("boolean"))
dc.transform(df).show()
Python API: DataConversion | Scala API: DataConversion | Source: DataConversion |
IndexToValue
- Python
- Scala
from synapse.ml.featurize import *
df = spark.createDataFrame([
(-3, 24, 0.32534, True, "piano"),
(1, 5, 5.67, False, "piano"),
(-3, 5, 0.32534, False, "guitar")
], ["int", "long", "double", "bool", "string"])
df2 = ValueIndexer().setInputCol("string").setOutputCol("string_cat").fit(df).transform(df)
itv = (IndexToValue()
.setInputCol("string_cat")
.setOutputCol("string_noncat"))
itv.transform(df2).show()
import com.microsoft.azure.synapse.ml.featurize._
import spark.implicits._
val df = Seq[(Int, Long, Double, Boolean, String)](
(-3, 24L, 0.32534, true, "piano"),
(1, 5L, 5.67, false, "piano"),
(-3, 5L, 0.32534, false, "guitar")).toDF("int", "long", "double", "bool", "string")
val df2 = new ValueIndexer().setInputCol("string").setOutputCol("string_cat").fit(df).transform(df)
val itv = (new IndexToValue()
.setInputCol("string_cat")
.setOutputCol("string_noncat"))
itv.transform(df2).show()
Python API: IndexToValue | Scala API: IndexToValue | Source: IndexToValue |
Featurize Text
MultiNGram
- Python
- Scala
from synapse.ml.featurize.text import *
from pyspark.ml.feature import Tokenizer
dfRaw = spark.createDataFrame([
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, ""),
(4, "1 2 3 4 5 6 7 8 9")
], ["label", "sentence"])
dfTok = (Tokenizer()
.setInputCol("sentence")
.setOutputCol("tokens")
.transform(dfRaw))
mng = (MultiNGram()
.setLengths([1, 3, 4])
.setInputCol("tokens")
.setOutputCol("ngrams"))
mng.transform(dfTok).show()
import com.microsoft.azure.synapse.ml.featurize.text._
import org.apache.spark.ml.feature.Tokenizer
import spark.implicits._
val dfRaw = (Seq(
(0, "Hi I"),
(1, "I wish for snow today"),
(2, "we Cant go to the park, because of the snow!"),
(3, ""),
(4, (1 to 10).map(_.toString).mkString(" ")))
.toDF("label", "sentence"))
val dfTok = (new Tokenizer()
.setInputCol("sentence")
.setOutputCol("tokens")
.transform(dfRaw))
val mng = (new MultiNGram()
.setLengths(Array(1, 3, 4))
.setInputCol("tokens")
.setOutputCol("ngrams"))
mng.transform(dfTok).show()
Python API: MultiNGram | Scala API: MultiNGram | Source: MultiNGram |
PageSplitter
- Python
- Scala
from synapse.ml.featurize.text import *
df = spark.createDataFrame([
("words words words wornssaa ehewjkdiw weijnsikjn xnh", ),
("s s s s s s", ),
("hsjbhjhnskjhndwjnbvckjbnwkjwenbvfkjhbnwevkjhbnwejhkbnvjkhnbndjkbnd", ),
("hsjbhjhnskjhndwjnbvckjbnwkjwenbvfkjhbnwevkjhbnwejhkbnvjkhnbndjkbnd 190872340870271091309831097813097130i3u709781", ),
("", ),
(None, )
], ["text"])
ps = (PageSplitter()
.setInputCol("text")
.setMaximumPageLength(20)
.setMinimumPageLength(10)
.setOutputCol("pages"))
ps.transform(df).show()
import com.microsoft.azure.synapse.ml.featurize.text._
import spark.implicits._
val df = Seq(
"words words words wornssaa ehewjkdiw weijnsikjn xnh",
"s s s s s s",
"hsjbhjhnskjhndwjnbvckjbnwkjwenbvfkjhbnwevkjhbnwejhkbnvjkhnbndjkbnd",
"hsjbhjhnskjhndwjnbvckjbnwkjwenbvfkjhbnwevkjhbnwejhkbnvjkhnbndjkbnd " +
"190872340870271091309831097813097130i3u709781",
"",
null
).toDF("text")
val ps = (new PageSplitter()
.setInputCol("text")
.setMaximumPageLength(20)
.setMinimumPageLength(10)
.setOutputCol("pages"))
ps.transform(df).show()
Python API: PageSplitter | Scala API: PageSplitter | Source: PageSplitter |
Image
UnrollImage
- Python
- Scala
from synapse.ml.image import *
from azure.storage.blob import *
# images = (spark.read.format("image")
# .option("dropInvalid", True)
# .load("wasbs://datasets@mmlspark.blob.core.windows.net/LIME/greyscale.jpg"))
# rit = (ResizeImageTransformer()
# .setOutputCol("out")
# .setHeight(15)
# .setWidth(10))
# preprocessed = rit.transform(images)
unroll = (UnrollImage()
.setInputCol("out")
.setOutputCol("final"))
# unroll.transform(preprocessed).show()
import com.microsoft.azure.synapse.ml.image._
import spark.implicits._
val images = (spark.read.format("image")
.option("dropInvalid", true)
.load("wasbs://datasets@mmlspark.blob.core.windows.net/LIME/greyscale.jpg"))
val rit = (new ResizeImageTransformer()
.setOutputCol("out")
.setHeight(15)
.setWidth(10))
val preprocessed = rit.transform(images)
val unroll = (new UnrollImage()
.setInputCol(rit.getOutputCol)
.setOutputCol("final"))
unroll.transform(preprocessed).show()
Python API: UnrollImage | Scala API: UnrollImage | Source: UnrollImage |
UnrollBinaryImage
- Python
- Scala
from synapse.ml.image import *
unroll = (UnrollBinaryImage()
.setInputCol("input_col")
.setOutputCol("final"))
import com.microsoft.azure.synapse.ml.image._
import spark.implicits._
val unroll = (new UnrollBinaryImage()
.setInputCol("input_col")
.setOutputCol("final"))
Python API: UnrollBinaryImage | Scala API: UnrollBinaryImage | Source: UnrollBinaryImage |
SuperpixelTransformer
- Python
- Scala
from synapse.ml.image import *
spt = (SuperpixelTransformer()
.setInputCol("images"))
import com.microsoft.azure.synapse.ml.image._
val spt = (new SuperpixelTransformer()
.setInputCol("images"))
Python API: SuperpixelTransformer | Scala API: SuperpixelTransformer | Source: SuperpixelTransformer |
IO
HTTPTransformer
- Python
- Scala
from synapse.ml.io.http import *
from pyspark.sql.functions import udf, col
from requests import Request
def world_bank_request(country):
return Request("GET", "http://api.worldbank.org/v2/country/{}?format=json".format(country))
df = (spark.createDataFrame([("br",), ("usa",)], ["country"])
.withColumn("request", http_udf(world_bank_request)(col("country"))))
ht = (HTTPTransformer()
.setConcurrency(3)
.setInputCol("request")
.setOutputCol("response"))
ht.transform(df).show()
import com.microsoft.azure.synapse.ml.io.http._
val ht = (new HTTPTransformer()
.setConcurrency(3)
.setInputCol("request")
.setOutputCol("response"))
Python API: HTTPTransformer | Scala API: HTTPTransformer | Source: HTTPTransformer |
SimpleHTTPTransformer
- Python
- Scala
from synapse.ml.io.http import *
from pyspark.sql.types import StringType, StructType
sht = (SimpleHTTPTransformer()
.setInputCol("data")
.setOutputParser(JSONOutputParser()
.setDataType(StructType().add("blah", StringType())))
.setUrl("PUT_YOUR_URL")
.setOutputCol("results")
.setConcurrency(3))
import com.microsoft.azure.synapse.ml.io.http._
import org.apache.spark.sql.types.{StringType, StructType}
val sht = (new SimpleHTTPTransformer()
.setInputCol("data")
.setOutputParser(new JSONOutputParser()
.setDataType(new StructType().add("blah", StringType)))
.setUrl("PUT_YOUR_URL")
.setOutputCol("results")
.setConcurrency(3))
Python API: SimpleHTTPTransformer | Scala API: SimpleHTTPTransformer | Source: SimpleHTTPTransformer |
JSONInputParser
- Python
- Scala
from synapse.ml.io.http import *
jsonIP = (JSONInputParser()
.setInputCol("data")
.setOutputCol("out")
.setUrl("PUT_YOUR_URL"))
import com.microsoft.azure.synapse.ml.io.http._
val jsonIP = (new JSONInputParser()
.setInputCol("data")
.setOutputCol("out")
.setUrl("PUT_YOUR_URL"))
Python API: JSONInputParser | Scala API: JSONInputParser | Source: JSONInputParser |
JSONOutputParser
- Python
- Scala
from synapse.ml.io.http import *
from pyspark.sql.types import StringType, StructType
jsonOP = (JSONOutputParser()
.setDataType(StructType().add("foo", StringType()))
.setInputCol("unparsedOutput")
.setOutputCol("parsedOutput"))
import com.microsoft.azure.synapse.ml.io.http._
import org.apache.spark.sql.types.{StringType, StructType}
val jsonOP = (new JSONOutputParser()
.setDataType(new StructType().add("foo", StringType))
.setInputCol("unparsedOutput")
.setOutputCol("parsedOutput"))
Python API: JSONOutputParser | Scala API: JSONOutputParser | Source: JSONOutputParser |
StringOutputParser
- Python
- Scala
from synapse.ml.io.http import *
sop = (StringOutputParser()
.setInputCol("unparsedOutput")
.setOutputCol("out"))
import com.microsoft.azure.synapse.ml.io.http._
val sop = (new StringOutputParser()
.setInputCol("unparsedOutput")
.setOutputCol("out"))
Python API: StringOutputParser | Scala API: StringOutputParser | Source: StringOutputParser |
CustomInputParser
- Python
- Scala
from synapse.ml.io.http import *
cip = (CustomInputParser()
.setInputCol("data")
.setOutputCol("out"))
import com.microsoft.azure.synapse.ml.io.http._
val cip = (new CustomInputParser()
.setInputCol("data")
.setOutputCol("out")
.setUDF({ x: Int => new HttpPost(s"http://$x") }))
Python API: CustomInputParser | Scala API: CustomInputParser | Source: CustomInputParser |
CustomOutputParser
- Python
- Scala
from synapse.ml.io.http import *
cop = (CustomOutputParser()
.setInputCol("unparsedOutput")
.setOutputCol("out"))
import com.microsoft.azure.synapse.ml.io.http._
val cop = (new CustomOutputParser()
.setInputCol("unparsedOutput")
.setOutputCol("out"))
Python API: CustomOutputParser | Scala API: CustomOutputParser | Source: CustomOutputParser |
Stages
Cacher
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, "guitars", "drums"),
(1, "piano", "trumpet"),
(2, "bass", "cymbals"),
(3, "guitars", "drums"),
(4, "piano", "trumpet"),
(5, "bass", "cymbals"),
(6, "guitars", "drums"),
(7, "piano", "trumpet"),
(8, "bass", "cymbals"),
(9, "guitars", "drums"),
(10, "piano", "trumpet"),
(11, "bass", "cymbals")
], ["numbers", "words", "more"]))
cacher = Cacher()
cacher.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = Seq(
(0, "guitars", "drums"),
(1, "piano", "trumpet"),
(2, "bass", "cymbals"),
(3, "guitars", "drums"),
(4, "piano", "trumpet"),
(5, "bass", "cymbals"),
(6, "guitars", "drums"),
(7, "piano", "trumpet"),
(8, "bass", "cymbals"),
(9, "guitars", "drums"),
(10, "piano", "trumpet"),
(11, "bass", "cymbals")
).toDF("numbers", "words", "more")
val cacher = new Cacher()
cacher.transform(df).show()
Python API: HTTPTransformer | Scala API: HTTPTransformer | Source: HTTPTransformer |
DropColumns
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, 0, "guitars", "drums", 1, True),
(1, 1, "piano", "trumpet", 2, False),
(2, 2, "bass", "cymbals", 3, True)
], ["numbers", "doubles", "words", "more", "longs", "booleans"]))
dc = DropColumns().setCols([])
dc.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val dc = new DropColumns().setCols(Array())
dc.transform(df).show()
Python API: DropColumns | Scala API: DropColumns | Source: DropColumns |
EnsembleByKey
- Python
- Scala
from synapse.ml.stages import *
from pyspark.ml.feature import VectorAssembler
scoreDF = (spark.createDataFrame([
(0, "foo", 1.0, .1),
(1, "bar", 4.0, -2.0),
(1, "bar", 0.0, -3.0)
], ["label1", "label2", "score1", "score2"]))
va = VectorAssembler().setInputCols(["score1", "score2"]).setOutputCol("v1")
scoreDF2 = va.transform(scoreDF)
ebk = EnsembleByKey().setKeys(["label1"]).setCols(["score1"])
ebk.transform(scoreDF2).show()
import com.microsoft.azure.synapse.ml.stages._
import org.apache.spark.ml.feature.VectorAssembler
val scoreDF = (Seq(
(0, "foo", 1.0, .1),
(1, "bar", 4.0, -2.0),
(1, "bar", 0.0, -3.0))
.toDF("label1", "label2", "score1", "score2"))
val va = new VectorAssembler().setInputCols(Array("score1", "score2")).setOutputCol("v1")
val scoreDF2 = va.transform(scoreDF)
val ebk = new EnsembleByKey().setKey("label1").setCol("score1")
ebk.transform(scoreDF2).show()
Python API: EnsembleByKey | Scala API: EnsembleByKey | Source: EnsembleByKey |
Explode
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, ["guitars", "drums"]),
(1, ["piano"]),
(2, [])
], ["numbers", "words"]))
explode = Explode().setInputCol("words").setOutputCol("exploded")
explode.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, Seq("guitars", "drums")),
(1, Seq("piano")),
(2, Seq()))
.toDF("numbers", "words"))
val explode = new Explode().setInputCol("words").setOutputCol("exploded")
explode.transform(df).show()
Python API: Explode | Scala API: Explode | Source: Explode |
Lambda
- Python
- Scala
from synapse.ml.stages import *
from pyspark.sql.types import StringType, StructType
df = (spark.createDataFrame([
(0, 0.0, "guitars", "drums", 1, True),
(1, 1.0, "piano", "trumpet", 2, False),
(2, 2.0, "bass", "cymbals", 3, True)
], ["numbers", "doubles", "words", "more", "longs", "booleans"]))
def transformFunc(df):
return df.select("numbers")
def transformSchemaFunc(schema):
return StructType([schema("numbers")])
l = (Lambda()
.setTransformFunc(transformFunc)
.setTransformSchemaFunc(transformSchemaFunc))
import com.microsoft.azure.synapse.ml.stages._
import org.apache.spark.sql.types.{StringType, StructType}
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val lambda = (new Lambda()
.setTransform(df => df.select("numbers"))
.setTransformSchema(schema => new StructType(Array(schema("numbers")))))
lambda.transform(df).show()
Python API: Lambda | Scala API: Lambda | Source: Lambda |
DynamicMiniBatchTransformer
- Python
- Scala
from synapse.ml.stages import *
from pyspark.sql.types import StringType, StructType
df = (spark.createDataFrame([(_, "foo") for _ in range(1, 11)], ["in1", "in2"]))
dmbt = DynamicMiniBatchTransformer()
dmbt.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (1 until 11).map(x => (x, "foo")).toDF("in1", "in2")
val dmbt = new DynamicMiniBatchTransformer()
dmbt.transform(df).show()
Python API: DynamicMiniBatchTransformer | Scala API: DynamicMiniBatchTransformer | Source: DynamicMiniBatchTransformer |
FixedMiniBatchTransformer
- Python
- Scala
from synapse.ml.stages import *
fmbt = (FixedMiniBatchTransformer()
.setBuffered(True)
.setBatchSize(3))
import com.microsoft.azure.synapse.ml.stages._
val fmbt = (new FixedMiniBatchTransformer()
.setBuffered(true)
.setBatchSize(3))
Python API: FixedMiniBatchTransformer | Scala API: FixedMiniBatchTransformer | Source: FixedMiniBatchTransformer |
TimeIntervalMiniBatchTransformer
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([(_, "foo") for _ in range(1, 11)], ["in1", "in2"]))
timbt = (TimeIntervalMiniBatchTransformer()
.setMillisToWait(1000)
.setMaxBatchSize(30))
timbt.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (1 until 11).map(x => (x, "foo")).toDF("in1", "in2")
val timbt = (new TimeIntervalMiniBatchTransformer()
.setMillisToWait(1000)
.setMaxBatchSize(30))
timbt.transform(df).show()
Python API: TimeIntervalMiniBatchTransformer | Scala API: TimeIntervalMiniBatchTransformer | Source: TimeIntervalMiniBatchTransformer |
FlattenBatch
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([(_, "foo") for _ in range(1, 11)], ["in1", "in2"]))
transDF = DynamicMiniBatchTransformer().transform(df)
fb = FlattenBatch()
fb.transform(transDF).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (1 until 11).map(x => (x, "foo")).toDF("in1", "in2")
val transDF = new DynamicMiniBatchTransformer().transform(df)
val fb = new FlattenBatch()
fb.transform(transDF).show()
Python API: FlattenBatch | Scala API: FlattenBatch | Source: FlattenBatch |
RenameColumn
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, 0, "guitars", "drums", 1, True),
(1, 1, "piano", "trumpet", 2, False),
(2, 2, "bass", "cymbals", 3, True)
], ["numbers", "doubles", "words", "more", "longs", "booleans"]))
rc = RenameColumn().setInputCol("words").setOutputCol("numbers")
rc.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val rc = new RenameColumn().setInputCol("words").setOutputCol("numbers")
rc.transform(df).show()
Python API: RenameColumn | Scala API: RenameColumn | Source: RenameColumn |
Repartition
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, "guitars", "drums"),
(1, "piano", "trumpet"),
(2, "bass", "cymbals"),
(3, "guitars", "drums"),
(4, "piano", "trumpet"),
(5, "bass", "cymbals"),
(6, "guitars", "drums"),
(7, "piano", "trumpet"),
(8, "bass", "cymbals"),
(9, "guitars", "drums"),
(10, "piano", "trumpet"),
(11, "bass", "cymbals")
], ["numbers", "words", "more"]))
repartition = Repartition().setN(1)
repartition.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, "guitars", "drums"),
(1, "piano", "trumpet"),
(2, "bass", "cymbals"),
(3, "guitars", "drums"),
(4, "piano", "trumpet"),
(5, "bass", "cymbals"),
(6, "guitars", "drums"),
(7, "piano", "trumpet"),
(8, "bass", "cymbals"),
(9, "guitars", "drums"),
(10, "piano", "trumpet"),
(11, "bass", "cymbals")
).toDF("numbers", "words", "more"))
val repartition = new Repartition().setN(1)
repartition.transform(df).show()
Python API: Repartition | Scala API: Repartition | Source: Repartition |
SelectColumns
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, 0.0, "guitars", "drums", 1, True),
(1, 1.0, "piano", "trumpet", 2, False),
(2, 2.0, "bass", "cymbals", 3, True)
], ["numbers", "words", "more"]))
sc = SelectColumns().setCols(["words", "more"])
sc.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val sc = new SelectColumns().setCols(Array("words", "more"))
sc.transform(df).show()
Python API: SelectColumns | Scala API: SelectColumns | Source: SelectColumns |
StratifiedRepartition
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, "Blue", 2),
(0, "Red", 2),
(0, "Green", 2),
(1, "Purple", 2),
(1, "Orange", 2),
(1, "Indigo", 2),
(2, "Violet", 2),
(2, "Black", 2),
(2, "White", 2),
(3, "Gray", 2),
(3, "Yellow", 2),
(3, "Cerulean", 2)
], ["values", "colors", "const"]))
sr = StratifiedRepartition().setLabelCol("values").setMode("equal")
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, "Blue", 2),
(0, "Red", 2),
(0, "Green", 2),
(1, "Purple", 2),
(1, "Orange", 2),
(1, "Indigo", 2),
(2, "Violet", 2),
(2, "Black", 2),
(2, "White", 2),
(3, "Gray", 2),
(3, "Yellow", 2),
(3, "Cerulean", 2)
).toDF("values", "colors", "const"))
val sr = new StratifiedRepartition().setLabelCol("values").setMode("equal")
sr.transform(df).show()
Python API: StratifiedRepartition | Scala API: StratifiedRepartition | Source: StratifiedRepartition |
SummarizeData
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
(0, 0.0, "guitars", "drums", 1, True),
(1, 1.0, "piano", "trumpet", 2, False),
(2, 2.0, "bass", "cymbals", 3, True)
], ["numbers", "doubles", "words", "more", "longs", "booleans"]))
summary = SummarizeData()
summary.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val summary = new SummarizeData()
summary.transform(df).show()
Python API: SummarizeData | Scala API: SummarizeData | Source: SummarizeData |
TextPreprocessor
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
("The happy sad boy drank sap", ),
("The hater sad doy drank sap", ),
("foo", ),
("The hater sad doy aABc0123456789Zz_", )
], ["words1"]))
testMap = {"happy": "sad", "hater": "sap",
"sad": "sap", "sad doy": "sap"}
textPreprocessor = (TextPreprocessor()
.setNormFunc("lowerCase")
.setMap(testMap)
.setInputCol("words1")
.setOutputCol("out"))
textPreprocessor.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
("The happy sad boy drank sap", ),
("The hater sad doy drank sap", ),
("foo", ),
("The hater sad doy aABc0123456789Zz_", ))
.toDF("words1"))
val testMap = Map[String, String] (
"happy" -> "sad",
"hater" -> "sap",
"sad" -> "sap",
"sad doy" -> "sap"
)
val textPreprocessor = (new TextPreprocessor()
.setNormFunc("lowerCase")
.setMap(testMap)
.setInputCol("words1")
.setOutputCol("out"))
textPreprocessor.transform(df).show()
Python API: TextPreprocessor | Scala API: TextPreprocessor | Source: TextPreprocessor |
UDFTransformer
- Python
- Scala
from synapse.ml.stages import *
from pyspark.sql.functions import udf
df = (spark.createDataFrame([
(0, 0.0, "guitars", "drums", 1, True),
(1, 1.0, "piano", "trumpet", 2, False),
(2, 2.0, "bass", "cymbals", 3, True)
], ["numbers", "doubles", "words", "more", "longs", "booleans"]))
stringToIntegerUDF = udf(lambda x: 1)
udfTransformer = (UDFTransformer()
.setUDF(stringToIntegerUDF)
.setInputCol("numbers")
.setOutputCol("out"))
udfTransformer.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
import org.apache.spark.sql.functions.udf
val df = (Seq(
(0, 0.toDouble, "guitars", "drums", 1.toLong, true),
(1, 1.toDouble, "piano", "trumpet", 2.toLong, false),
(2, 2.toDouble, "bass", "cymbals", 3.toLong, true))
.toDF("numbers", "doubles", "words", "more", "longs", "booleans"))
val stringToIntegerUDF = udf((_: String) => 1)
val udfTransformer = (new UDFTransformer()
.setUDF(stringToIntegerUDF)
.setInputCol("numbers")
.setOutputCol("out"))
udfTransformer.transform(df).show()
Python API: UDFTransformer | Scala API: UDFTransformer | Source: UDFTransformer |
UnicodeNormalize
- Python
- Scala
from synapse.ml.stages import *
df = (spark.createDataFrame([
("Schön", 1),
("Scho\u0308n", 1),
(None, 1)
], ["words1", "dummy"]))
unicodeNormalize = (UnicodeNormalize()
.setForm("NFC")
.setInputCol("words1")
.setOutputCol("norm1"))
unicodeNormalize.transform(df).show()
import com.microsoft.azure.synapse.ml.stages._
val df = (Seq(
("Schön", 1),
("Scho\u0308n", 1),
(null, 1))
.toDF("words1", "dummy"))
val unicodeNormalize = (new UnicodeNormalize()
.setForm("NFC")
.setInputCol("words1")
.setOutputCol("norm1"))
unicodeNormalize.transform(df).show()
Python API: UnicodeNormalize | Scala API: UnicodeNormalize | Source: UnicodeNormalize |
Train
ComputeModelStatistics
- Python
- Scala
from synapse.ml.train import *
from numpy import random
df = spark.createDataFrame(
[(random.rand(), random.rand()) for _ in range(2048)], ["label", "prediction"]
)
cms = (ComputeModelStatistics()
.setLabelCol("label")
.setScoredLabelsCol("prediction")
.setEvaluationMetric("classification"))
cms.transform(df).show()
import com.microsoft.azure.synapse.ml.train._
import scala.util.Random
val rand = new Random(1337)
val df = (Seq.fill(2048)(rand.nextDouble())
.zip(Seq.fill(2048)(rand.nextDouble()))
.toDF("label", "prediction"))
val cms = (new ComputeModelStatistics()
.setLabelCol("label")
.setScoredLabelsCol("prediction")
.setEvaluationMetric("classification"))
cms.transform(df).show()
Python API: ComputeModelStatistics | Scala API: ComputeModelStatistics | Source: ComputeModelStatistics |
ComputePerInstanceStatistics
- Python
- Scala
from synapse.ml.train import *
cps = (ComputePerInstanceStatistics()
.setLabelCol("label")
.setScoredLabelsCol("LogRegScoredLabelsCol")
.setScoresCol("LogRegScoresCol")
.setScoredProbabilitiesCol("LogRegProbCol")
.setEvaluationMetric("classification"))
import com.microsoft.azure.synapse.ml.train._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.FastVectorAssembler
val logisticRegression = (new LogisticRegression()
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setMaxIter(10)
.setLabelCol("label")
.setPredictionCol("LogRegScoredLabelsCol")
.setRawPredictionCol("LogRegScoresCol")
.setProbabilityCol("LogRegProbCol")
.setFeaturesCol("features"))
val dataset = spark.createDataFrame(Seq(
(0.0, 2, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 4, 0.78, 0.99, 2.0),
(3.0, 5, 0.12, 0.34, 3.0),
(0.0, 1, 0.50, 0.60, 0.0),
(1.0, 3, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0),
(0.0, 0, 0.50, 0.60, 0.0),
(1.0, 2, 0.40, 0.50, 1.0),
(2.0, 3, 0.78, 0.99, 2.0),
(3.0, 4, 0.12, 0.34, 3.0)))
.toDF("label", "col1", "col2", "col3", "prediction")
val assembler = (new FastVectorAssembler()
.setInputCols(Array("col1", "col2", "col3"))
.setOutputCol("features"))
val assembledDataset = assembler.transform(dataset)
val model = logisticRegression.fit(assembledDataset)
val scoredData = model.transform(assembledDataset)
val cps = (new ComputePerInstanceStatistics()
.setLabelCol("label")
.setScoredLabelsCol("LogRegScoredLabelsCol")
.setScoresCol("LogRegScoresCol")
.setScoredProbabilitiesCol("LogRegProbCol")
.setEvaluationMetric("classification"))
cps.transform(scoredData).show()
Python API: ComputePerInstanceStatistics | Scala API: ComputePerInstanceStatistics | Source: ComputePerInstanceStatistics |